home *** CD-ROM | disk | FTP | other *** search
/ Skunkware 5 / Skunkware 5.iso / src / Games / ms-0.07 / lib / work.c < prev    next >
Encoding:
C/C++ Source or Header  |  1995-06-27  |  17.3 KB  |  722 lines

  1. /* work.c -- MandelSpawn work distribution */
  2.  
  3. /*
  4.   Queue requests for calculation, possibly from different clients
  5.   (such as multiple windows), distribute them among the computation
  6.   servers and return the results to the client that requested them.
  7.  
  8.   See work.h for some kind of interface definition. 
  9. */
  10.  
  11. /*  
  12.     This file is part of MandelSpawn, a network Mandelbrot program.
  13.  
  14.     Copyright (C) 1990-1993 Andreas Gustafsson
  15.  
  16.     MandelSpawn is free software; you can redistribute it and/or modify
  17.     it under the terms of the GNU General Public License, version 1,
  18.     as published by the Free Software Foundation.
  19.  
  20.     MandelSpawn is distributed in the hope that it will be useful,
  21.     but WITHOUT ANY WARRANTY; without even the implied warranty of
  22.     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  23.     GNU General Public License for more details.
  24.  
  25.     You should have received a copy of the GNU General Public License,
  26.     version 1, along with this program; if not, write to the Free 
  27.     Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  28. */
  29.  
  30.  
  31. #include <stdio.h>
  32. #include <string.h>
  33. #include <errno.h>    /* pre-X11R4 systems need this for EWOULDBLOCK */
  34.  
  35. #ifdef NO_STRDUP
  36. char *strdup();
  37. #endif
  38.  
  39. /*
  40.   It is impossible to declare malloc() in a portable way.
  41.   Be prepared to change these declarations. 
  42. */
  43. void *malloc(), *realloc(); 
  44. void free();
  45.  
  46. #include "datarep.h"
  47. #include "ms_ipc.h"
  48. #include "io.h"
  49. #include "work.h"
  50.  
  51. /* This is for Linux */
  52. #ifndef FNDELAY
  53. #ifdef O_NDELAY
  54. #define FNDELAY O_NDELAY
  55. #endif
  56. #endif
  57.  
  58. #ifdef NO_BCOPY
  59. #define bcopy(s,d,n) memcpy(d,s,n)
  60. #endif
  61.  
  62. typedef struct slave
  63. { char *name_string;        /* machine name of the slave */
  64.   NET_ADDRESS name;         /* network address of the slave */
  65.   int has_timeout;         /* the timeout has been initialized */
  66.   unsigned timeout;        /* timeout in milliseconds */
  67. #ifndef OLD_TIMEOUT
  68.   long timeout_at;        /* when to timeout (seconds since epoc) */
  69. #else
  70.   char *timer_id;         /* timeout id (for removing the timeout) */
  71. #endif
  72.   unsigned int n_timeouts;     /* how many times the slave has timed out */
  73.   unsigned int n_packets;    /* number of packets that have arrived */
  74.   unsigned int n_late_packets;    /* number of packets that arrived too late */
  75.   unsigned long mi_count;     /* number of iterations done by this slave */
  76.   unsigned int no;        /* slave serial number */
  77.   struct wf_state *backptr;    /* back pointer to the wf_state */
  78.   int disabled;            /* slave disabled due to error */
  79. } slave;
  80.  
  81. typedef struct chunk
  82. { struct chunk *next;
  83.   struct chunk *prev;
  84.   int drawn;            /* true if at least one reply has arrived */
  85.   char *client;         /* pointer to widget owning this chunk */
  86.   unsigned int no;         /* serial number within current sequence */
  87.   char *client_data;        /* client data (unknown size) */
  88.   char *slave_data;        /* slave data */
  89.   unsigned int slave_datalen;    /* length of slave data */
  90. } chunk;
  91.  
  92. struct wf_state
  93.   io_state *io;            /* pointer to I/O object */
  94.   int sequence;            /* current sequence number */
  95.   int pid;            /* pid of this process */
  96.   unsigned n_slaves;         /* number of slaves */
  97.   struct slave **slaves;    /* array of pointers to slave descriptors */
  98.   int n_chunks;            /* number of chunks in the active sequence */
  99.   unsigned int max_chunks;     /* current size of chunk index */
  100.   struct chunk **chunks;     /* pointer to the chunk index */
  101.   struct chunk to_draw;     /* head of queue of chunks to be drawn */
  102.   struct chunk drawn;        /* head of queue of chunks already drawn */
  103.   struct chunk *insert_point;    /* point in queue for inserting new work */
  104. };
  105.  
  106. /* forward refs */
  107. static void handle_reply_msg(), whip_slave(), timeout_set(), timeout_unset();
  108.  
  109. /* names of files containing server hostnames */
  110. #define PERSONAL_SLAVEFILE ".mslaves" 
  111. #ifndef PUBLIC_SLAVEFILE
  112. #define PUBLIC_SLAVEFILE "/usr/local/etc/mslaves"
  113. #endif
  114.  
  115. #define SERVER_PROG    "mslaved"
  116.  
  117. #define INITIAL_CHUNKS 1024
  118.  
  119. #define MAX_WORKPACKET_SIZE 64
  120. #define DATAGRAM_BYTES 1200
  121.  
  122. char *getenv();
  123.  
  124. #ifdef HAVE_SOCKETS
  125.  
  126. /*
  127.   Get the address of a host given either the host name or the address
  128.   in nnn.nnn.nnn.nnn notation 
  129. */
  130.  
  131. struct hostent *gethostbyname();
  132.  
  133. static struct in_addr *
  134. gethostaddrbywhatever(p)
  135.      char *p;
  136. { struct hostent *hp;
  137.  
  138.   /* statically allocated return value buffer */
  139.   /* (this is ugly, but gethostbyname() is no better) */
  140.   static struct in_addr nobyinaddr;
  141.  
  142.   if ((hp = gethostbyname(p)) != NULL)
  143.   { return((struct in_addr *) (hp->h_addr));
  144.   }
  145.   else
  146.   { /* try it as a number nnn.nnn.nnn.nnn */
  147.     if ((nobyinaddr.s_addr = inet_addr(p)) == -1)
  148.     { return(0);
  149.     }
  150.     return(&nobyinaddr);
  151.   }
  152. }
  153.  
  154.  
  155. /* 
  156.   Null-terminate the current field in a whitespace-separated list
  157.   and return a pointer to the next field or NULL if there is no
  158.   next field.  Text after a hash sign is taken as comment and ignored.
  159. */
  160.  
  161. static char *
  162. next_field(p) char *p;
  163. { while(1)
  164.   { register char c = *p;
  165.     switch(c)
  166.     {
  167.     case ' ':
  168.     case '\t':
  169.       *p = '\0';
  170.       while(1)
  171.       { int c = *++p;
  172.     if(c == '\n' || c == '\0') return(0);
  173.     if(c != ' ' && c != '\t') break;
  174.       }
  175.       return(p);
  176.     case '\0':
  177.       return(0);
  178.     case '\n':
  179.     case '#':
  180.       *p = '\0';
  181.       return(0);
  182.     default:
  183.       break;
  184.     }
  185.     p++;
  186.   }
  187. }
  188. #endif /* HAVE_SOCKETS */
  189.  
  190. slave *new_slave(wf, i)
  191.   wf_state *wf;
  192.   int i;
  193. { slave *s = (slave *) malloc(sizeof(slave));
  194.   s->mi_count = 0L;
  195.   s->has_timeout = 0;
  196.   s->n_timeouts = s->n_packets = s->n_late_packets = 0;
  197.   s->disabled = 0;
  198.   s->no = i;
  199.   s->backptr = wf;
  200.   return(s);
  201. }
  202.  
  203. /* Symbolic names for the ends of a pipe */
  204. #define READ 0
  205. #define WRITE 1
  206.  
  207. /* Initialize the workforce */
  208.  
  209. wf_state *
  210. wf_init(timeout, pipe_mux, socket_mux)
  211.     unsigned timeout;
  212.     io_multiplex pipe_mux, socket_mux;
  213. { FILE *f;
  214.   int i;
  215.   char *filename, *home;
  216.   unsigned int size = 16; /* initial size of slave table */
  217.   char buf[256];
  218.   wf_state *wf = (wf_state *) malloc(sizeof(wf_state));
  219.  
  220.   /* general initialization: */
  221.   
  222.   /* set up the chunk index */
  223.   wf->max_chunks = INITIAL_CHUNKS;
  224.   wf->n_chunks = 0;
  225.   wf->chunks = (chunk **) malloc(wf->max_chunks * sizeof(chunk *));
  226.   
  227.   /* set up the chunk queues */
  228.   wf->to_draw.prev = wf->to_draw.next = &wf->to_draw;
  229.   wf->drawn.prev = wf->drawn.next = &wf->drawn;
  230.  
  231.   wf->pid = getpid();
  232.   wf->sequence = 0;
  233.  
  234. #ifdef HAVE_SOCKETS  
  235.   /* .mslaves file stuff */
  236.   wf->slaves = (slave **) malloc(size*sizeof(slave *));
  237.   home = getenv("HOME");
  238.   if(!home)
  239.     wf_error("HOME not set");
  240.   filename = malloc((unsigned)(strlen(home)+1+strlen(PERSONAL_SLAVEFILE)+1));
  241.   strcpy(filename, home); 
  242.   strcat(filename, "/");
  243.   strcat(filename, PERSONAL_SLAVEFILE);
  244.   f = fopen(filename, "r");
  245.   free(filename);
  246.   if(!f)
  247.   { f = fopen(PUBLIC_SLAVEFILE, "r");
  248.   }
  249.   if(!f) /* no .mslaves file */
  250. #endif /* HAVE_SOCKETS */
  251.   { slave *s;
  252.     int pipe_from_server[2];
  253.     int pipe_to_server[2];
  254. #ifdef HAVE_SOCKETS    
  255.     wf_warn("No .mslaves file found, reverting to single-CPU mode");
  256. #endif /* HAVE_SOCKETS */
  257.     wf->slaves = (slave **) malloc(sizeof(slave *));
  258.     s = new_slave(wf, 0);
  259.     /* s->name is not used */
  260.     s->name_string = "localhost";
  261.     s->timeout = timeout;
  262.     wf->slaves[0] = s;
  263.     wf->n_slaves = 1;
  264.  
  265.     pipe(pipe_to_server);
  266.     pipe(pipe_from_server);
  267.   
  268.     if(! fork())
  269.     { /* child */
  270.       close(0); dup(pipe_to_server[READ]);
  271.       close(1); dup(pipe_from_server[WRITE]);
  272.       close(pipe_to_server[WRITE]);
  273.       close(pipe_from_server[READ]);
  274.       execlp(SERVER_PROG, SERVER_PROG, "-p", "-t0", (char *) 0);
  275.       wf_error("could not exec server program");
  276.     }
  277.  
  278.     wf->io = io_init(
  279.              IO_TRANS_PIPE,
  280.              pipe_mux,
  281.              pipe_from_server[READ], pipe_to_server[WRITE],
  282.              (char *) wf,
  283.              /* this allocates space according to MAX_DATAGRAM_SIZE */
  284.              malloc(sizeof(Message)), sizeof(Message),
  285.              handle_reply_msg, wf_tick);
  286.     close(pipe_to_server[READ]);
  287.     close(pipe_from_server[WRITE]);
  288.   }
  289. #ifdef HAVE_SOCKETS  
  290.   else
  291.   { /* found a .mslaves file */
  292.     i = 0;
  293.     while(1)
  294.     { char *p; /* points to current field in .mslaves line */
  295.       char *q; /* points to next field in .mslaves line */
  296.       slave *s;
  297.       struct in_addr *ina;
  298.       unsigned port;
  299.       
  300.       if(!fgets(buf, sizeof(buf), f))
  301.     break;
  302.       if(buf[0] == '\n' || buf[0] == '#')
  303.     continue;
  304.       p = buf;
  305.       q = next_field(p);
  306.       ina = gethostaddrbywhatever(p);
  307.       
  308.       if(ina == 0)
  309.       {    static char warn[] = "unknown host in .mslaves, ignored: ";
  310.     /* trying to keep up with GNU coding standards... */
  311.     char *msg = malloc(sizeof(warn)+strlen(p));
  312.     strcpy(msg, warn);
  313.     strcat(msg, p);
  314.     wf_warn(msg);
  315.     free(msg);
  316.     continue;
  317.       }
  318.       
  319.       p = q;
  320.       port = DEFAULT_PORT;
  321.       if(p) /* there is a "port" field */
  322.       {    q = next_field(p);
  323.     port = (unsigned) atoi(p);
  324.     if(port == 0) /* probably not an integer, and port 0 is bad anyway */
  325.     { static char warn[] = "bad port field in .mslaves, machine ignored: ";
  326.       char *msg = malloc(sizeof(warn)+strlen(p));
  327.       strcpy(msg, warn);
  328.       strcat(msg, p);
  329.       wf_warn(msg);
  330.       free(msg);
  331.       continue;
  332.     }
  333.       }
  334.       
  335.       if(q)
  336.       {    wf_warn("trailing junk in .mslaves");
  337.       }
  338.       
  339.       if(i>=size) 
  340.       {    size *= 2;
  341.     wf->slaves =
  342.       (slave **) realloc((char *) wf->slaves, size*sizeof(slave *));
  343.       }
  344.       s = new_slave(wf, i);
  345.       bcopy((char *) ina, (char *) &s->name.sin_addr, sizeof(struct in_addr));
  346.       s->name.sin_family = AF_INET;
  347.       s->name.sin_port = htons(port);
  348.       s->name_string = strdup(buf);
  349.       s->timeout = timeout;
  350.       wf->slaves[i] = s;
  351.       i++;
  352.     }
  353.     wf->n_slaves = i;
  354.   }
  355.  
  356.   /* make a socket for communicating with the slaves */
  357.   { int sock;
  358.     if((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) 
  359.       wf_error("socket");
  360.     /* make sure we don't block reading the socket */  
  361.     if(fcntl(sock, F_SETFL, FNDELAY) == -1)
  362.       wf_error("unblocking socket");
  363.     wf->io = io_init(IO_TRANS_UDP,
  364.              socket_mux,
  365.              sock, sock, (char *) wf,
  366.              /* this allocates space according to MAX_DATAGRAM_SIZE */
  367.              malloc(sizeof(Message)), sizeof(Message),
  368.              handle_reply_msg, wf_tick);
  369.   }
  370. #endif /* HAVE_SOCKETS */
  371.   return(wf);
  372. }
  373.  
  374.  
  375. /*
  376.   Delete the chunk "c" from anywhere in the queue.  Don't deallocate
  377.   the chunk yet because duplicated messages may still refer to it.  
  378. */
  379.  
  380. static void
  381. queue_delete(c)
  382.      chunk *c;
  383. { c->next->prev = c->prev;
  384.   c->prev->next = c->next;
  385.   c->next = c->prev = NULL; /* just for easier debugging */
  386. }
  387.  
  388.  
  389. /* Add a chunk to the tail of the queue. */
  390.  
  391. static void
  392. queue_add(q, c) chunk *q; chunk *c;
  393. { c->prev = q->prev;
  394.   c->next = q;
  395.   q->prev->next = c;
  396.   q->prev = c;
  397. }
  398.  
  399.  
  400. /* Test the emptiness of a chunk queue. */
  401.  
  402. static int
  403. queue_empty(q)
  404.      chunk *q;
  405. { return(q->next==q && q->prev==q);
  406. }
  407.  
  408.  
  409. /* Return the head of a chunk queue. */
  410.  
  411. static chunk *
  412. queue_head(q)
  413.      chunk *q;
  414. { return(q->next);
  415. }
  416.  
  417.  
  418. void wf_timed_out(client_data)
  419.   char *client_data;
  420. { slave *s = (slave *) client_data;
  421.   s->has_timeout = 0;
  422.   s->n_timeouts++;
  423.   whip_slave(s->backptr, s);
  424. }
  425.  
  426. /* Set a timeout for a slave; this is done when the slave is whipped. */
  427.  
  428. static void
  429. timeout_set(s)
  430.      slave *s;
  431. {
  432. #ifndef OLD_TIMEOUT
  433.   s->timeout_at = time((long *)0) + s->timeout / 1000;
  434. #else
  435.   /* first make sure there isn't a timeout already */
  436.   timeout_unset(s);
  437.   s->timer_id=wf_add_timeout(s->timeout, (char *) s);
  438. #endif
  439.   s->has_timeout=1;
  440. }
  441.  
  442.  
  443. /* Remove a timeout. */
  444.  
  445. static void 
  446. timeout_unset(s)
  447.      slave *s;
  448. {
  449. #ifdef OLD_TIMEOUT  
  450.   if(s->has_timeout)
  451.     wf_remove_timeout(s->timer_id);
  452. #endif
  453.   s->has_timeout=0;
  454. }
  455.  
  456.  
  457. /* Stop the slaves. */
  458.  
  459. static void
  460. stop_slaves(wf)
  461.     wf_state *wf;
  462. { int i;
  463.   for(i=0; i<wf->n_chunks; i++)
  464.   { chunk *c = wf->chunks[i];
  465.     free((char *) (c->client_data));
  466.     free((char *) (c->slave_data));
  467.     free((char *) c);
  468.   }
  469.   /* don't shrink the chunk index array; we probably need it again */
  470.   wf->n_chunks=0;
  471.   wf->sequence++;
  472.   wf->drawn.prev=wf->drawn.next= &wf->drawn;
  473. }
  474.  
  475.  
  476. /* Put the specified slave to work (if there is any) */
  477.  
  478. static void 
  479. whip_slave(wf, s)
  480.   wf_state *wf;
  481.   slave *s;
  482. { struct
  483.   { WhipMessage m;
  484.     char data[MAX_WORKPACKET_SIZE];
  485.   } mm; /* buffer for building the message to send */
  486.   chunk *c;
  487.   
  488.   if(s->disabled || queue_empty(&wf->to_draw))
  489.     return;
  490.  
  491.   c = queue_head(&wf->to_draw);
  492.   mm.m.header.magic = htons(MAGIC);
  493.   mm.m.header.type = htons(WHIP_MESSAGE);
  494.   mm.m.header.version = htons(VERSION);
  495.   mm.m.header.format = htons(DATA_FORMAT);
  496.   mm.m.id.pid = wf->pid;
  497.   mm.m.id.seq = wf->sequence;
  498.   mm.m.id.slave_no = s->no;
  499.   mm.m.id.chunk_no = c->no;
  500.  
  501.   if(c->slave_datalen > MAX_WORKPACKET_SIZE)
  502.     wf_error("work packet too large");
  503.   bcopy(c->slave_data, mm.m.data, c->slave_datalen);
  504.  
  505.   if(io_send(wf->io, (char *)&mm, sizeof(mm.m)+c->slave_datalen,
  506.          (char *) &s->name, (int) sizeof(s->name)) == -1)
  507.   { wf_warn("error sending datagram, use of affected server disabled");
  508.     s->disabled=1; /* consider this slave unusable */
  509.   }
  510.  
  511.   timeout_set(s); 
  512.   /* move the chunk from the head to the tail of the queue */
  513.   queue_delete(c);
  514.   queue_add(&wf->to_draw, c);
  515. }
  516.  
  517.  
  518. /* Handle a reply from a slave. */
  519.  
  520. static void 
  521. handle_reply_msg(closure, msg, msglen)
  522.   char *closure; /* really a wf_state* */
  523.   Message *msg;
  524.   int msglen; /*ARGSUSED*/
  525. { wf_state *wf = (wf_state *) closure;
  526.   char *client;
  527.   slave *s;
  528.   chunk *c;
  529.   
  530.   unsigned int pid = msg->reply.id.pid;
  531.   unsigned int seqno = msg->reply.id.seq;
  532.   unsigned int chunkno = msg->reply.id.chunk_no;
  533.   unsigned int slaveno = msg->reply.id.slave_no;
  534.   int late;
  535.   
  536.   if(pid != wf->pid) return;
  537.   if(slaveno >= wf->n_slaves) return;
  538.   s = wf->slaves[slaveno];
  539.   s->n_packets++;
  540.   if(seqno != wf->sequence) 
  541.   { s->n_late_packets++;
  542.     return;
  543.   }
  544.   if(chunkno >= wf->n_chunks) return;
  545.   c = wf->chunks[chunkno];
  546.   
  547.   timeout_unset(s);
  548.  
  549.   client = c->client;
  550.  
  551.   /*
  552.     The chunk is too late if the client has gone away or if the chunk
  553.     has been drawn already.
  554.   */
  555.   late = (!client || c->drawn);
  556.  
  557.   if(late) /* ignore the message if the chunk arrived too late */
  558.   { s->n_late_packets++;
  559.   }
  560.   else
  561.   { queue_delete(c);
  562.     queue_add(&wf->drawn, c);
  563.     c->drawn=1;
  564.   }
  565.  
  566.   /* Put the slave to work again */
  567.   whip_slave(wf, s);
  568.   
  569.   if(!late)
  570.   { wf_draw(client, c->client_data, (char *) &(msg->reply.data));
  571.     s->mi_count += ntohl(msg->reply.mi_count);
  572.   }
  573.  
  574.   /* If done, stop all the slaves and deallocate the chunks */
  575.   if(queue_empty(&wf->to_draw))
  576.       stop_slaves(wf);
  577. }
  578.  
  579.  
  580. /*
  581.   Prepare for dispatching new chunks.
  582. */
  583.  
  584. void 
  585. wf_begin_dispatch(wf)
  586.      wf_state *wf;
  587. {
  588.   wf->insert_point = wf->to_draw.next;
  589. }
  590.  
  591.  
  592. /*
  593.   This function is called by the Ms widget for each chunk it 
  594.   wants to be calculated.
  595. */
  596.  
  597. void 
  598. wf_dispatch_chunk(wf, client, client_data, client_datalen,
  599.           slave_data, slave_datalen)
  600.      wf_state *wf;
  601.      char *client;
  602.      char *client_data;
  603.      unsigned int client_datalen;
  604.      char *slave_data;
  605.      unsigned int slave_datalen;
  606. { int chunkno = wf->n_chunks++;
  607.   chunk *c = (chunk *) malloc(sizeof(chunk));
  608.   c->client_data = (char *) malloc(client_datalen);
  609.   bcopy(client_data, c->client_data, client_datalen);
  610.   c->slave_data = (char *) malloc(slave_datalen);
  611.   bcopy(slave_data, c->slave_data, slave_datalen);
  612.   c->slave_datalen = slave_datalen;
  613.   c->client = client;
  614.   c->drawn = 0;
  615.   c->no = chunkno;
  616.   /* grow the chunk index if necessary */
  617.   if(chunkno >=  wf->max_chunks)
  618.   { wf->max_chunks *= 2;
  619.     wf->chunks = (chunk **) realloc((char *) wf->chunks,
  620.                        wf->max_chunks*sizeof(chunk *));
  621.   }
  622.   wf->chunks[chunkno] = c;
  623.   queue_add(wf->insert_point, c);
  624. }
  625.  
  626.  
  627. /* Make sure all the slaves are put to work. */
  628.  
  629. void wf_restart(wf)
  630.   wf_state *wf;
  631. { int i;
  632.   for(i=0; i < wf->n_slaves; i++)
  633.   { whip_slave(wf, wf->slaves[i]);
  634.   }
  635. }
  636.  
  637.  
  638. /* Handle the situation of a client aborting prematurely. */
  639.  
  640. void wf_client_died(wf, cli)
  641.   wf_state *wf; char *cli;
  642. { chunk *c;
  643.   chunk *next_c;
  644.   /* Remove all chunks of the dead widget from the work queue */
  645.   /* and nullify their widget pointer fields so that late packets */
  646.   /* won't reference the nonexistent widget */
  647.   for(c=wf->to_draw.next; c != &wf->to_draw; c=next_c)
  648.   { /* need to use a temporary variable because */
  649.     /* relinking the chunk fouls up the .next field */
  650.     next_c=c->next; 
  651.     if(c->client == cli)
  652.     { c->client = NULL;
  653.       queue_delete(c);
  654.       queue_add(&wf->drawn, c);
  655.     }
  656.   }
  657. }
  658.  
  659.  
  660. /* Print performance statistics. */
  661.  
  662. void wf_print_stats(wf, f)
  663.   wf_state *wf; FILE *f;
  664. { int i;
  665.   int active=0;
  666.   unsigned long mi_tot=0;
  667.   fprintf(f, "\n%-22s %10s %10s %10s %10s\n",
  668.      "Host", "iterations", "packets", "timeouts", "late");
  669.   for(i=0; i<wf->n_slaves; i++)
  670.   { slave *s= wf->slaves[i];
  671.     fprintf(f, "%-22s %10lu %10u %10u %10u\n",
  672.        s->name_string, 
  673.        s->mi_count, s->n_packets, s->n_timeouts, s->n_late_packets);
  674.     if(s->mi_count)
  675.       active++;
  676.     mi_tot += s->mi_count;
  677.   }
  678.   fprintf(f, "%d servers, %d active, %lu iterations total\n",
  679.      wf->n_slaves, active, mi_tot);
  680.   fflush(f);
  681. }
  682.  
  683.  
  684. /* Make information about the maximum message size available. */
  685. unsigned
  686. wf_max_message_size()
  687. { return(DATAGRAM_BYTES-sizeof(ReplyHeader));
  688. }
  689.  
  690. #ifndef OLD_TIMEOUT
  691. void wf_tick(closure)
  692.   char *closure;
  693. { wf_state *wf = (wf_state *) closure;
  694.   int i;
  695.   unsigned n_slaves = wf->n_slaves;
  696.   long now = time((long *) 0);
  697.   for(i=0; i<n_slaves; i++)
  698.   { slave *s = s=wf->slaves[i];
  699.     if(s->has_timeout && s->timeout_at <= now)
  700.     { wf_timed_out((char *) s);
  701.     }
  702.   }
  703. }
  704. #endif
  705.  
  706. wf_main(wf)
  707.   wf_state *wf;
  708. { io_main(wf->io);
  709. }
  710.  
  711. void wf_done(wf)
  712.   wf_state *wf;
  713. { io_done(wf->io);
  714. }
  715.  
  716. io_state *wf_get_io(wf)
  717.   wf_state *wf;
  718. {
  719.   return wf->io;
  720. }
  721.